{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Tensorflow DALI plugin: DALI tf.data.Dataset with multiple GPUs\n",
    "\n",
    "### Overview\n",
    "\n",
    "This notebook is a comprehensive example on how to use DALI `tf.data.Dataset` with multiple GPUs. It is recommended to look into [single GPU example](tensorflow-dataset.ipynb) first to get up to speed with DALI dataset and how it can be used to train a neural network with custom model and training loop. This example is an extension of the single GPU version.\n",
    "\n",
    "Initially we define some parameters of the training and to create a DALI pipeline to read [MNIST](http://yann.lecun.com/exdb/mnist/) converted to LMDB format. You can find it in [DALI_extra](https://github.com/NVIDIA/DALI_extra) dataset. This pipeline is able to partition the dataset into multiple shards.\n",
    "\n",
    "`DALI_EXTRA_PATH` environment variable should point to the place where data from [DALI extra repository](https://github.com/NVIDIA/DALI_extra) is downloaded. Please make sure that the proper release tag is checked out."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nvidia.dali as dali\n",
    "from nvidia.dali.pipeline import Pipeline\n",
    "import nvidia.dali.ops as ops\n",
    "import nvidia.dali.types as types\n",
    "\n",
    "import os\n",
    "\n",
    "import nvidia.dali.plugin.tf as dali_tf\n",
    "import tensorflow.compat.v1 as tf\n",
    "\n",
    "tf.logging.set_verbosity(tf.logging.ERROR)\n",
    "tf.disable_eager_execution()\n",
    "tf.reset_default_graph()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Path to MNIST dataset\n",
    "data_path = os.path.join(os.environ['DALI_EXTRA_PATH'], 'db/MNIST/training/')\n",
    "\n",
    "BATCH_SIZE = 64\n",
    "DROPOUT = 0.2\n",
    "IMAGE_SIZE = 28\n",
    "NUM_CLASSES = 10\n",
    "HIDDEN_SIZE = 128\n",
    "EPOCHS = 5\n",
    "ITERATIONS = 100\n",
    "NUM_DEVICES = 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "class MnistPipeline(Pipeline):\n",
    "    def __init__(\n",
    "        self, batch_size, device_id=0, shard_id=0, num_shards=1, num_threads=4, seed=0):\n",
    "        super(MnistPipeline, self).__init__(\n",
    "            batch_size, num_threads, device_id, seed)\n",
    "        self.reader = ops.Caffe2Reader(\n",
    "            path=data_path, random_shuffle=True, shard_id=0, num_shards=num_shards)\n",
    "        self.decode = ops.ImageDecoder(\n",
    "            device='mixed',\n",
    "            output_type=types.GRAY)\n",
    "        self.cmn = ops.CropMirrorNormalize(\n",
    "            device='gpu',\n",
    "            output_dtype=types.FLOAT,\n",
    "            image_type=types.GRAY,\n",
    "            std=[255.],\n",
    "            output_layout=\"CHW\")\n",
    "\n",
    "    def define_graph(self):\n",
    "        inputs, labels = self.reader(name=\"Reader\")\n",
    "        images = self.decode(inputs)\n",
    "        labels = labels.gpu()\n",
    "        images = self.cmn(images)\n",
    "\n",
    "        return (images, labels)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we create some parameters needed for the DALI dataset. For more details on what they are you can look into [single GPU example](tensorflow-dataset.ipynb)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "options = tf.data.Options()\n",
    "options.experimental_optimization.apply_default_optimizations = False\n",
    "options.experimental_optimization.autotune = False\n",
    "\n",
    "\n",
    "shapes = [\n",
    "    (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE),\n",
    "    (BATCH_SIZE)]\n",
    "dtypes = [\n",
    "    tf.float32,\n",
    "    tf.int32]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we utilize more than one GPU for this training, we use the function below to average gradient between the devices."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This function is copied form: https://github.com/tensorflow/models/blob/master/tutorials/image/cifar10/cifar10_multi_gpu_train.py#L102\n",
    "def average_gradients(tower_grads):\n",
    "    average_grads = []\n",
    "    for grad_and_vars in zip(*tower_grads):\n",
    "        # Note that each grad_and_vars looks like the following:\n",
    "        #   ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))\n",
    "        grads = []\n",
    "        for g, _ in grad_and_vars:\n",
    "            # Add 0 dimension to the gradients to represent the tower.\n",
    "            expanded_g = tf.expand_dims(g, 0)\n",
    "\n",
    "            # Append on a 'tower' dimension which we will average over below.\n",
    "            grads.append(expanded_g)\n",
    "\n",
    "        # Average over the 'tower' dimension.\n",
    "        grad = tf.concat(grads, 0)\n",
    "        grad = tf.reduce_mean(grad, 0)\n",
    "\n",
    "        # Keep in mind that the Variables are redundant because they are shared\n",
    "        # across towers. So .. we will just return the first tower's pointer to\n",
    "        # the Variable.\n",
    "        v = grad_and_vars[0][1]\n",
    "        grad_and_var = (grad, v)\n",
    "        average_grads.append(grad_and_var)\n",
    "    return average_grads"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we are ready to define the model. Note that one instance of the DALI dataset is created per GPU. Each instance reads only the part of the MNIST dataset assinged with `shard_id` parameter of the wrapped pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "iterator_initializers = []\n",
    "\n",
    "with tf.device('/cpu:0'):\n",
    "    tower_grads = []\n",
    "\n",
    "    for i in range(NUM_DEVICES):\n",
    "        with tf.device('/gpu:{}'.format(i)):\n",
    "            daliset = dali_tf.DALIDataset(\n",
    "                pipeline=MnistPipeline(\n",
    "                    BATCH_SIZE, device_id=i, shard_id=i, num_shards=NUM_DEVICES),\n",
    "                batch_size=BATCH_SIZE,\n",
    "                shapes=shapes,\n",
    "                dtypes=dtypes,\n",
    "                device_id=i).with_options(options)\n",
    "\n",
    "            iterator = tf.data.make_initializable_iterator(daliset)\n",
    "            iterator_initializers.append(iterator.initializer)\n",
    "            images, labels = iterator.get_next()\n",
    "\n",
    "            labels = tf.reshape(\n",
    "                tf.one_hot(labels, NUM_CLASSES),\n",
    "                [BATCH_SIZE, NUM_CLASSES])\n",
    "\n",
    "            with tf.variable_scope('mnist_net', reuse=(i != 0)):\n",
    "                images = tf.layers.flatten(images)\n",
    "                images = tf.layers.dense(images, HIDDEN_SIZE, activation=tf.nn.relu)\n",
    "                images = tf.layers.dropout(images, rate=DROPOUT, training=True)\n",
    "                images = tf.layers.dense(images, NUM_CLASSES, activation=tf.nn.softmax)\n",
    "\n",
    "            logits_train = images\n",
    "\n",
    "            loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(\n",
    "                logits=logits_train, labels=labels))\n",
    "            optimizer = tf.train.AdamOptimizer()\n",
    "            grads = optimizer.compute_gradients(loss_op)\n",
    "\n",
    "            if i == 0:\n",
    "                correct_pred = tf.equal(\n",
    "                    tf.argmax(logits_train, 1), tf.argmax(labels, 1))\n",
    "                accuracy = tf.reduce_mean(\n",
    "                    tf.cast(correct_pred, tf.float32))\n",
    "\n",
    "            tower_grads.append(grads)\n",
    "\n",
    "    tower_grads = average_gradients(tower_grads)\n",
    "    train_step = optimizer.apply_gradients(tower_grads)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Everything is now ready to run the training. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Step 0, accuracy: 0.109375\n",
      "Step 100, accuracy: 0.875\n",
      "Step 200, accuracy: 0.84375\n",
      "Step 300, accuracy: 0.953125\n",
      "Step 400, accuracy: 0.90625\n",
      "Final accuracy:  0.915\n"
     ]
    }
   ],
   "source": [
    "with tf.Session() as sess:\n",
    "        sess.run(tf.global_variables_initializer())\n",
    "        sess.run(iterator_initializers)\n",
    "\n",
    "        for i in range(EPOCHS * ITERATIONS):\n",
    "            sess.run(train_step)\n",
    "            if i % ITERATIONS == 0:\n",
    "                train_accuracy = sess.run(accuracy)\n",
    "                print(\"Step %d, accuracy: %g\" % (i, train_accuracy))\n",
    "\n",
    "        final_accuracy = 0\n",
    "        for _ in range(ITERATIONS):\n",
    "            final_accuracy = final_accuracy + \\\n",
    "                sess.run(accuracy)\n",
    "        final_accuracy = final_accuracy / ITERATIONS\n",
    "\n",
    "        print('Final accuracy: ', final_accuracy)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
